#!/usr/bin/env bash
# local/compute-execute -- Executes a process locally using all available processors
# $ compute-execute input_sql=... command=... output_relation=...
#
# To limit the number of parallel processes, set the DEEPDIVE_NUM_PROCESSES
# environment or the 'deepdive.computers.local.num_processes' in
# computers.conf:
# $ export DEEPDIVE_NUM_PROCESSES=2
# $ compute-execute input_sql=... command=... output_relation=...
##
set -euo pipefail

# load compute configuration
eval "$(jq2sh <<<"$DEEPDIVE_COMPUTER_CONFIG" \
    num_processes='.num_processes' \
    num_parallel_unloads='.num_parallel_unloads' \
    num_parallel_loads='.num_parallel_loads' \
    named_pipes_dir='.named_pipes_dir' \
    #
)"
# respect the DEEPDIVE_NUM_PROCESSES environment
num_processes=${DEEPDIVE_NUM_PROCESSES:-${num_processes:-$(nproc --ignore=1)}}
num_parallel_unloads=${DEEPDIVE_NUM_PARALLEL_UNLOADS:-${num_parallel_unloads:-1}}
num_parallel_loads=${DEEPDIVE_NUM_PARALLEL_LOADS:-${num_parallel_loads:-1}}

# ensure mkfifo works on named_pipes_dir, respecting DEEPDIVE_NAMED_PIPES_DIR
named_pipes_dir=${DEEPDIVE_NAMED_PIPES_DIR:-${named_pipes_dir:-$PWD}}
can_create_named_pipes() {
    mkfifo "$pipes_dir"/.deepdive_named_pipe_probe.$$
    rm -f  "$pipes_dir"/.deepdive_named_pipe_probe.$$
}
{ pipes_dir=$named_pipes_dir    && can_create_named_pipes; } ||
{ pipes_dir=$HOME               && can_create_named_pipes; } ||
{ pipes_dir=${TMPDIR:=/tmp}     && can_create_named_pipes; } ||
    error "None of the following paths allow creation of named pipes:" \
        "  $named_pipes_dir" \
        "  $HOME" \
        "  $TMPDIR" \
        "Please set DEEPDIVE_NAMED_PIPES_DIR to a writable path in a filesystem that allows mkfifo" \
        #

# declare all input arguments
declare -- "$@"

# create a subdirectory for collocating named pipes
pipes=$(mktemp -d "$pipes_dir/deepdive-compute-execute.XXXXXXX")
trap 'rm -rf "$pipes"' EXIT

# show configuration
echo "Executing with the following configuration:"
echo " DEEPDIVE_NUM_PROCESSES=$num_processes"
echo " DEEPDIVE_NUM_PARALLEL_UNLOADS=$num_parallel_unloads"
echo " DEEPDIVE_NUM_PARALLEL_LOADS=$num_parallel_loads"
echo " DEEPDIVE_NAMED_PIPES_DIR=$pipes_dir"

# XXX there are conditional branches below depending on whether input_sql
# and/or output_relation is given, to support four use cases:
# 1) executing command while streaming data from/to the database
# 2) input-only command which has no output to the database and streams from the database
# 3) output-only command which has no input from the database and streams to the database
# 4) database-independent command which simply runs in parallel

# set up named pipes for parallel processes and make sure they are cleaned up upon exit
[[ -z $input_sql       ]] || for i in $(seq $num_processes); do mkfifo "$pipes"/process-$i.input ; done
[[ -z $output_relation ]] || for i in $(seq $num_processes); do mkfifo "$pipes"/process-$i.output; done
# now spawn processes attached to the named pipes in reverse order (from sink to source)
pids_command=() pids_load=() pids_unload=()

# spawn multiple processes attached to the pipes
if [[ -n $output_relation && -n $input_sql ]]; then # process with input from/output to database
    for i in $(seq $num_processes); do
        DEEPDIVE_CURRENT_PROCESS_INDEX=$i \
        bash -c "$command" <"$pipes"/process-$i.input >"$pipes"/process-$i.output &
        pids_command+=($!)
    done
elif [[ -n $input_sql ]]; then # input-only process
    for i in $(seq $num_processes); do
        DEEPDIVE_CURRENT_PROCESS_INDEX=$i \
        bash -c "$command" <"$pipes"/process-$i.input &
        pids_command+=($!)
    done
elif [[ -n $output_relation ]]; then # output-only process
    for i in $(seq $num_processes); do
        DEEPDIVE_CURRENT_PROCESS_INDEX=$i \
        bash -c "$command" >"$pipes"/process-$i.output &
        pids_command+=($!)
    done
else # neither output_relation nor input_sql specified
    for i in $(seq $num_processes); do
        DEEPDIVE_CURRENT_PROCESS_INDEX=$i \
        bash -c "$command" &
        pids_command+=($!)
    done
fi

if [[ -n $output_relation ]]; then
    # set up pipes for parallel loads
    for i in $(seq $num_parallel_loads); do mkfifo "$pipes"/output_computed-$i; done
    # use mkmimo again to merge outputs of multiple processes into a single stream
    mkmimo "$pipes"/process-*.output \> "$pipes"/output_computed-* &
    pids_load+=($!)
    # load the output data to the temporary table in the database
    deepdive-load "$output_relation" "$pipes"/output_computed-* &
    pids_load+=($!)
fi

if [[ -n $input_sql ]]; then
    # set up pipes for parallel unloads
    for i in $(seq $num_parallel_unloads); do mkfifo "$pipes"/feed_processes-$i; done
    # unload data from the database and pour into the pipes
    deepdive-db unload "$input_sql" "$DEEPDIVE_LOAD_FORMAT" "$pipes"/feed_processes-* &
    pids_unload+=($!)
    # use mkmimo to distribute input data to multiple processes
    mkmimo "$pipes"/feed_processes-* \> "$pipes"/process-*.input &
    pids_unload+=($!)
fi

# make sure all the child processes finishes without error
pids_all=()
[[ ${#pids_command[@]} -eq 0 ]] || pids_all+=("${pids_command[@]}")
[[ ${#pids_load[@]}    -eq 0 ]] || pids_all+=("${pids_load[@]}"   )
[[ ${#pids_unload[@]}  -eq 0 ]] || pids_all+=("${pids_unload[@]}" )
all_finishes_ok() {
    local what=$1; shift
    local pid=
    for pid in "$@"; do
        wait $pid || {
            exit_status=$?
            kill -TERM "${pids_all[@]}"
            error "${what:+$what: }PID $pid: finished with non-zero exit status ($exit_status)"
        }
    done
}
# the order of below is important. if we don't check unload first and unload fails, we are stuck
[[ ${#pids_unload[@]}  -eq 0 ]] || all_finishes_ok "deepdive-unload"                 "${pids_unload[@]}"
[[ ${#pids_command[@]} -eq 0 ]] || all_finishes_ok "command=$(escape4sh "$command")" "${pids_command[@]}"
[[ ${#pids_load[@]}    -eq 0 ]] || all_finishes_ok "deepdive-load"                   "${pids_load[@]}"
wait  # until everything is done ##############################################
